本文为您介绍Hudi CDC功能的相关参数和使用示例。
背景信息
CDC(Change Data Capture)定义了一种场景,即识别并捕获数据库表中数据的变更,并交付给下游进一步处理。Hudi CDC能够将Hudi表作为Source,直接获取变更的数据信息。
使用限制
仅EMR-3.45.0及后续版本和EMR-5.11.0及后续版本的集群,并且Hudi版本为0.12.2时,支持使用Hudi CDC功能。
相关参数
CDC写参数
参数 | 说明 |
hoodie.table.cdc.enabled | 是否开启CDC,取值如下:
|
hoodie.table.cdc.supplemental.logging.mode | CDC文件存储模式,共有三种等级:
|
CDC读参数
参数 | 说明 |
hoodie.datasource.query.type | 查询类型,使用CDC功能需配置为 默认值为snapshot。 |
hoodie.datasource.query.incremental.format | 增量查询类型,使用CDC功能需配置为 默认值为latest_state。 |
hoodie.datasource.read.begin.instanttime | 增量查询起始时间。 |
hoodie.datasource.read.end.instanttime | 增量查询截止时间,可选参数。 |
使用示例
Spark SQL
在Spark服务配置页面的spark-defaults.conf页签中,新增配置项参数spark.serializer,参数值为org.apache.spark.serializer.KryoSerializer。新增配置项的具体操作,请参见添加配置项。
执行以下命令,新建表。
create table hudi_cdc_test ( id bigint, name string, ts bigint ) using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'ts', 'hoodie.table.cdc.enabled' = 'true', 'hoodie.table.cdc.supplemental.logging.mode' = 'data_before_after' );
执行以下命令,向表中写入数据并查看表信息。
insert into hudi_cdc_test values (1, 'a1', 1000), (2, 'a2', 1001); select * from hudi_cdc_test;
返回信息如下。
20230129220605215 20230129220605215_0_0 1 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet 1 a1 1000 20230129220605215 20230129220605215_0_1 2 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet 2 a2 1001
从.hoodie目录中获取上一次commit的时间戳,进行CDC查询。
获取上一次commit的时间戳。
-rw-r--r-- 1 zxy staff 1.2K 1 29 22:06 20230129220605215.commit -rw-r--r-- 1 zxy staff 0B 1 29 22:06 20230129220605215.commit.requested -rw-r--r-- 1 zxy staff 798B 1 29 22:06 20230129220605215.inflight
执行以下命令,进行CDC查询。
由于查询区间为左开右闭,所以将时间戳减1作为起始时间。
select * from hudi_table_changes("hudi_cdc_test", "20230129220605214");
返回信息如下。
i 20230129220605215 NULL {"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_0","name":"a1","_hoodie_commit_time":"20230129220605215","ts":1000,"id":1} i 20230129220605215 NULL {"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_1","name":"a2","_hoodie_commit_time":"20230129220605215","ts":1001,"id":2}
执行以下命令,再次写入数据并查看表信息。
insert into hudi_cdc_test values (2, 'a2', 1002); select * from hudi_cdc_test;
返回信息如下。
20230129220605215 20230129220605215_0_0 1 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet 1 a1 1000 20230129221304930 20230129221304930_0_1 2 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet 2 a2 1002
参见步骤3,获取上一次commit的时间戳并减1,进行CDC查询。
例如,获取到的时间戳为
20230129221304930
。执行以下命令,进行CDC查询。select * from hudi_table_changes("hudi_cdc_test", "20230129221304929");
返回信息如下。
u 20230129221304930 {"_hoodie_commit_time": "20230129220605215", "_hoodie_commit_seqno": "20230129220605215_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet", "id": 2, "name": "a2", "ts": 1001}{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet","_hoodie_commit_seqno":"20230129221304930_0_1","name":"a2","_hoodie_commit_time":"20230129221304930","ts":1002,"id":2}
Dataframe
准备工作。
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.hudi.{HoodieSparkSessionExtension, HoodieSparkSqlTestBase} val spark: SparkSession = SparkSession.builder() .master("local[4]") .withExtensions(new HoodieSparkSessionExtension) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() import spark.implicits._ val basePath = "/tmp/test/hudi_cdc_test" val writeOpts = Map( "hoodie.table.name" -> "hudi_cdc_test", "hoodie.datasource.write.recordkey.field" -> "id", "hoodie.datasource.write.precombine.field" -> "ts", "hoodie.table.cdc.enabled" -> "true", "hoodie.table.cdc.supplemental.logging.mode" -> "op_key_only" ) val readOpts = Map( "hoodie.datasource.query.type" -> "incremental", "hoodie.datasource.query.incremental.format" -> "cdc" )
使用df1写入数据。
val df1 = Seq((1, "a1", 1000), (2, "a2", 1001)).toDF("id", "name", "ts") df1.write.format("hudi") .options(writeOpts) .mode(SaveMode.Append) .save(basePath) df1.show(false)
返回信息如下。
+---+----+----+ |id |name|ts | +---+----+----+ |1 |a1 |1000| |2 |a2 |1001| +---+----+----+
读取cdc1的数据。
val metaClient = HoodieTableMetaClient.builder() .setBasePath(basePath) .setConf(spark.sessionState.newHadoopConf()) .build() val timestamp1 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp val cdc1 = spark.read.format("hudi") .options(readOpts) .option("hoodie.datasource.read.begin.instanttime", (timestamp1.toLong - 1).toString) .load(basePath) cdc1.show(false)
返回信息如下。
+---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |op |ts_ms |before|after | +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |i |20230128030951890|null |{"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_0","name":"a1","_hoodie_commit_time":"20230128030951890","ts":1000,"id":1}| |i |20230128030951890|null |{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_1","name":"a2","_hoodie_commit_time":"20230128030951890","ts":1001,"id":2}| +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
使用df2写入数据。
val df2 = Seq((2, "a2", 1002)).toDF("id", "name", "ts") df2.write.format("hudi") .options(writeOpts) .mode(SaveMode.Append) .save(basePath) df2.show(false)
返回信息如下。
+---+----+----+ |id |name|ts | +---+----+----+ |2 |a2 |1002| +---+----+----+
读取cdc2的数据。
val timestamp2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp val cdc2 = spark.read.format("hudi") .options(readOpts) .option("hoodie.datasource.read.begin.instanttime", (timestamp2.toLong - 1).toString) .load(basePath) cdc2.show(false)
返回信息如下。
+---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |op |ts_ms |before |after | +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |u |20230128031235363|{"_hoodie_commit_time": "20230128030951890", "_hoodie_commit_seqno": "20230128030951890_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet", "id": 2, "name": "a2", "ts": 1001}|{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-60-52_20230128031235363.parquet","_hoodie_commit_seqno":"20230128031235363_0_1","name":"a2","_hoodie_commit_time":"20230128031235363","ts":1002,"id":2}| +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+